-
Notifications
You must be signed in to change notification settings - Fork 3k
Python: Add Avro read path #4920
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
970872f to
730486a
Compare
Reads Avro file by first reading the headers, and then extracting the schema Then we convert the Avro schema into Iceberg, and read the actual binary using the schema visitor
samredai
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is awesome @Fokko! I left some comments. For some of the comments in src/iceberg/avro/, I know we're vendoring some of that so please feel free to ignore any nit/style comments there. I'm super excited that we'll have a read path that utilizes all standard lib stuff. 😄
| assert "Unknown logical/physical type combination:" in str(exc_info.value) | ||
|
|
||
|
|
||
| def test_logical_map_with_invalid_fields(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see a pattern here so we may be able to consolidate these into a handful of parametrized tests, making them easily extendable too. How about making a single function per method that's being tested?
AvroSchemaConversion()._convert_logical_typeAvroSchemaConversion()._convert_logical_map_typeAvroSchemaConversion()._convert_schemaAvroSchemaConversion()._convert_fieldAvroSchemaConversion()._convert_record_typeAvroSchemaConversion()._convert_array_type
As an example, for AvroSchemaConversion()._convert_logical_type:
@pytest.mark.parametrize(
"avro_logical_type,expected",
[
({"type": "int", "logicalType": "date"}, DateType()),
(
{"type": "bytes", "logicalType": "decimal", "precision": 19, "scale": 25},
DecimalType(precision=19, scale=25),
),
...,
],
)
def test_schema_conversion_convert_logical_type(avro_logical_type, expected):
assert AvroSchemaConversion()._convert_logical_type(avro_logical_type) == expectedThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, I'm not a fan of parameterized tests:
- I find them hard to read with all the (curly)braces
- If you have many tests, and the last one is failing, you have to rerun the earlier ones all the time, which is kind of annoying if you have breakpoints everywhere.
- The parameterized arguments are evaluated every time, even if you run an unrelated test.
I don't mind a few additional tests.
python/src/iceberg/avro/reader.py
Outdated
| return self._data[pos] | ||
|
|
||
|
|
||
| class _AvroReader(SchemaVisitor[Union[AvroStructProtocol, Any]]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this calls a decoder when it visits a schema, but I was expecting an implementation that creates a reader tree that accepts a decoder, like the other one. Why did you decide to go with this approach over the other one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @rdblue. This made the most sense to me at the time. Which one is the other one? I don't have a strong opinion on this way or the other. Probably you have done this more often than me :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggested the other approach for a few reasons. First, I think when it is reasonable to match the approach taken by the Java codebase, that's a good idea. That way we don't have completely different implementations to validate and maintain.
Second, this approach traverses the schema for each record read. That is inefficient compared to building a tree of readers that handle this. The reader tree approach allows us to create basically a state machine that is ready to read records.
Last, this is harder to update. The next step is to reconcile the differences between the read and write schemas. Doing that for every record is hard to write with this approach. Same thing with reusing structs, dicts, and lists.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've refactored the code and added a reader tree. I love the approach because it gives a much nicer decoupling between the actual reading and the schema. This will make the reader/writer schema much easier to implement.
I kept it as simple as possible to not prematurely optimize the code and keep the PR a bit more concise. We can add things like reusing structs, dicts and lists later on.
python/src/iceberg/avro/codec.py
Outdated
|
|
||
| @staticmethod | ||
| def decompress(readers_decoder: BinaryDecoder) -> BinaryDecoder: | ||
| _ = readers_decoder.read_long() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compression stores the length even if it is uncompressed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to check, and this is the case. I've added a test with a snappy and a null-codec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually an artifact of how the decoder is used. Since the block length is right before the block, the decoder does the right thing when you call read_bytes. But in this case there's no need to create a new decoder. It just needs to consume the length bytes and return the existing one. It makes sense, although I do think it is a bit strange to couple the compression API with decoders.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, you're right and this doesn't look good. This is actually a bug. I'll refactor this, including passing the bytes to the compression API instead of the decoder, which indeed doesn't make much sense.
rdblue
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko, this is great! I made a few comments but it's really close.
python/src/iceberg/avro/decoder.py
Outdated
| the unix epoch, 1 January 1970 (ISO calendar). | ||
| """ | ||
| days_since_epoch = self.read_int() | ||
| return date(1970, 1, 1) + timedelta(days_since_epoch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: I'd prefer to use days_to_date just like this uses micros_to_time and micros_to_timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it 👍🏻 Updated
python/src/iceberg/utils/datetime.py
Outdated
|
|
||
| def micros_to_timestamp(micros: int, tzinfo: timezone | None = None): | ||
| dt = timedelta(microseconds=micros) | ||
| unix_epoch_datetime = datetime(1970, 1, 1, 0, 0, 0, 0, tzinfo=tzinfo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use EPOCH_TIMESTAMP and EPOCH_TIMESTAMPTZ instead of creating a new datetime here?
def micros_to_timestamp(micros: int):
return EPOCH_TIMESTAMP + timedelta(microseconds=micros)
def micros_to_timestamptz(micros: int):
return EPOCH_TIMESTAMPTZ + timedelta(microseconds=micros)|
Hey @rdblue I've gone through the comments, the only one open is currently: #4920 (comment) Not sure if we want to break the protocol, or make a helper class. Apart from that, I've added a lot of tests to do some checks and boost the coverage a bit. Let me know what you think! |
| from abc import ABC, abstractmethod | ||
|
|
||
|
|
||
| class Codec(ABC): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: would it make sense to put this in the __init__.py file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some projects do this, and some don't :) For me, it makes sense to add base classes in the init. Mostly because they need to be loaded anyway, and by adding them to the __init__.py they are read when you access the module. Also, for the case of the codecs, this avoids having yet another file. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for adding it to __init__.py. Fewer files cuts down on load time and we will need to load it anyway.
python/src/iceberg/avro/decoder.py
Outdated
| the unix epoch, 1 January 1970 (ISO calendar). | ||
| """ | ||
| days_to_date = self.read_int() | ||
| return date(1970, 1, 1) + timedelta(days_to_date) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about adding days_to_date to datetime.py? Then you could use EPOCH_DATE and this would be reusable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Auch, that one slipped through the cracks. I just updated the code
| def __init__(self, input_file: InputFile) -> None: | ||
| self.input_file = input_file | ||
|
|
||
| def __enter__(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a follow up, I think we should start a base class for our readers that handles __iter__, __enter__, and __exit__. This should probably use threading.local() to ensure that with is thread-safe if the file is shared, and I think we can make __iter__ return an iterator class. But these all work great for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm happy to do this, but I would suggest doing this in a separate PR. We could also extend the Input- and OutputFile. That __enter__ calls open() or create(). Having a separate Iterator isn't pythonic.
With regard to the threading, I think that's another can of worms. (At least as a start) reading a file should not be considered thread-safe, and we should not share the file across threads. I would rather suggest reading the files in parallel using something like multiprocessing. We could also split out the blocks in the Avro file if we like. Another option would be to make the read async. We could also go fancy and go for a async iterator. But looking at the size of this PR, I think we should split that out. Let me know if you feel otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, definitely as a separate PR. I didn't mean to suggest doing it now.
|
Thanks, @Fokko! Everything looks great in here. Thanks for updating the tests. Looking forward to the next steps! |
Reads Avro file by first reading the headers, and then extracting the schema Then we convert the Avro schema into Iceberg, and read the actual binary using the schema visitor.
The binary decoder and codecs have been copied from
apache/avrobecause I didn't want to depend on the library for just that. Also,apache/avrouses the general IO interface, while we have our own FileStream interface for reading files.To not make the PR too big, I'm working on the follow-up PRs: